feat: page-bounded Arrow decoder per data page (PR-6a.2)#6407
Draft
g-talbot wants to merge 4 commits into
Draft
Conversation
af78c8f to
2714921
Compare
4bc7122 to
5d5c4b1
Compare
2714921 to
61b6310
Compare
5d5c4b1 to
e660f78
Compare
61b6310 to
4ae07e7
Compare
e660f78 to
fcfb854
Compare
4ae07e7 to
d43186d
Compare
fcfb854 to
736ce0e
Compare
d43186d to
11b9d53
Compare
736ce0e to
7bcf723
Compare
11b9d53 to
123ed7e
Compare
Bridges PR-4's ColumnPageStream (raw compressed pages in storage order) to arrow's standard ParquetRecordBatchReaderBuilder (decoded arrays). PR-6's streaming merge engine drains each input row-group through this to keep per-RG memory bounded — only one input RG worth of bytes is materialised at a time, rather than the whole file. Approach: reconstruct one row group's column-chunk byte layout in a buffer (column chunks placed at their original offsets, gaps zero- padded), wrap the buffer in `Bytes`, and feed it to `ParquetRecordBatchReaderBuilder::new_with_metadata` with `with_row_groups([rg_idx])`. Byte-exact reconstruction by carrying each page's original Thrift-compact `header_bytes` through PR-4's streaming reader — no re-encoding, so encoder-version drift inside the compactor cannot silently corrupt outputs. Adds `header_bytes: Bytes` to `Page` and captures the drained header bytes inside `parse_page_header_streaming`. New `StreamDecoder` borrows the stream and exposes `next_rg()` returning one `RecordBatch` per input row group, idempotent at EOF. Tests (9, all passing): single-RG and multi-RG drains, multi-page columns, dict columns, null preservation, compression codec roundtrip (uncompressed/snappy/zstd — LZ4 not enabled in our parquet feature set), idempotent EOF, byte-exact reconstruction proof, and I/O failure surfacing as PageDecodeError::PageStream rather than masked as decode. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CI nightly rustfmt (newer than my local at the time of the original push) wraps `write_parquet(...)` onto multiple lines. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
7bcf723 to
67ac5b0
Compare
Replaces PR-6a's per-RG fat-buffer approach. The previous implementation reconstructed a whole row group's column-chunk bytes into a single buffer and fed it to ParquetRecordBatchReaderBuilder — peak memory was RG-size (tens to hundreds of MB per call). This rewrite is page-bounded. API change: \`StreamDecoder::next_rg() -> Option<RecordBatch>\` is replaced by \`decode_next_page() -> Option<DecodedPage>\`. Each call returns one input data page's worth of decoded rows as an \`ArrayRef\`, plus \`(rg_idx, col_idx, page_idx_in_col, row_start)\` indexing so PR-6b's merge engine can slice take indices per page. Dictionary pages are absorbed silently (fed to the column reader for subsequent data-page decoding); INDEX_PAGE is skipped. Memory at any time: - One in-flight page (compressed + decompressed bytes) - One cached dictionary page per (rg, col) when dict-encoded - One column reader per (rg, col) with small bookkeeping (level decoders, value decoder) Does NOT buffer the row group, a column chunk, or a materialised RecordBatch. Implementation: wraps parquet-rs's public \`GenericColumnReader\` over a per-(rg, col) PageQueue we feed one page at a time. Page → ColumnPage conversion handles decompression (via \`compression::create_codec\`, which required enabling parquet's \`experimental\` feature on our Cargo.toml — the API has been stable across recent parquet-rs versions, just not yet de-experimentalised), \`format::Encoding\` (Thrift wrapper) → \`basic::Encoding\` translation, and DataPageV2's unencrypted-levels-then-compressed-values layout. Array builders cover the production schema: Boolean, Int8/16/32/64, UInt8/16/32/64, Float32/64, Utf8/LargeUtf8/Binary/LargeBinary, and \`List<non-nullable primitive>\` (DDSketch \`keys\` / \`counts\`). Dict columns decode to their value type (Utf8/Binary); the merge engine's union schema normalises strings to Utf8 anyway, and the output writer re-applies dict encoding based on observed cardinality. Tests (9, all passing): - single-RG and multi-RG round-trip (per-column comparison vs. canonical arrow reader) - per-page indexing (\`row_start\`, \`page_idx_in_col\` monotonic per-(rg, col)) - idempotent EOF - nullable column (\`service\` with nulls every 5th row) - compression codecs (uncompressed, snappy, zstd) - I/O failures surface as \`PageDecodeError::PageStream\` - \`List<UInt64>\` (DDSketch \`counts\`) with variable list lengths including empty list and \`u64::MAX\` - structural page-bounded contract: PageQueue depth ≤ 2 (one queued dictionary plus the current data page) across a long stream Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CI's `cargo +nightly fmt --check` flags a single trailing blank line at end of file. No functional change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
StreamDecoder::decode_next_page()returns one [DecodedPage] per call (rg_idx, col_idx, page_idx, row_start, ArrayRef) instead of materialising an entire row group at a time.DecodedPage]s in storage order (row-group-major, column-major-within-rg, page-major-within-col), applies merge plan slicing per page, and streams output pages directly into the writer without column-chunk staging.How it works
Page] from the underlying [ColumnPageStream]. SkipINDEX_PAGE(historical Thrift variant, not emitted by production writers).PageQueuethat feeds parquet-rs's [ColumnReader] one page at a time, plus a counter tracking rows decoded so far.Page] to parquet-rs'scolumn::page::Pageenum: decompress via [parquet::compression::create_codec] (requires theexperimentalfeature), translateformat::Encoding(Thrift wrapper) →basic::Encoding(Rust enum) via a manual i32 match (no public conversion in parquet-rs), drop optional statistics.ColumnReader] to decode exactlyheader.num_valuesrecords viaread_records(...)calls in a loop, pulling values + def/rep levels into typed buffers.ArrayReffrom(values, def_levels, rep_levels)per the column's parquet physical type. Emit [DecodedPage].Type coverage
Flat physical types:
Boolean,Int8/16/32+UInt8/16/32(parquetInt32with logical annotation),Int64/UInt64(parquetInt64),Float32,Float64,Utf8/LargeUtf8/Binary/LargeBinary(parquetByteArray). Dictionary-encoded pages are decoded via the cached dict page → values pipeline.List<T>/LargeList<T>where outer + inner are non-nullable and inner is a flat primitive — covers DDSketchkeys(List<Int16>) andcounts(List<UInt64>). Dremel def/rep levels (max_def=1, max_rep=1) are decoded via the sameread_recordspath; arrow offsets are computed vialist_offsets_from_levels.Other nested shapes (nullable list inner/outer,
Struct,Map,FixedSizeList, multi-leaf nested) return an unsupported-type error rather than silently falling back to a different mechanism.Sync ⇄ async bridging
The page stream is async (S3 reads);
PageReader(from parquet-rs) is sync. Bridged viaArc<Mutex<VecDeque<ColumnPage>>>per (rg, col):decode_next_pagepulls from the stream (async), pushes onto the queue, then the syncPageReaderimpl pops from the queue when theColumnReaderasks for the next page.peek_next_pageandskip_next_pageare properly implemented to support the parquet-rs reader's state machine.Schema handling
parquet_to_arrow_schema(parquet_schema, None)bypasses the ARROW:schema hint that would otherwise force Dictionary types — input parquet files written from arrow declare Dictionary columns in ARROW:schema metadata, but their page-encoded values are plain values that decode toStringArray/etc. Decoding without the hint gives consistent flat-primitive output that the merge engine then interleaves.Tests
9 tests, all passing:
test_drain_single_rg_round_trip,test_drain_multi_rg_round_trip— full round-trip viadecode_next_pagematchesParquetRecordBatchReaderBuilder.test_decoded_page_row_indexing—row_startcorrectly tracks per-(rg, col) row offsets.test_eof_idempotent— repeated calls after EOF stayOk(None).test_nullable_column_round_trip— def-level decoding for nullable cols.test_compression_codecs— snap, gzip, zstd round-trip.test_page_bounded_queue_depth— verifies the internal page queue depth stays ≤ 2 across a long stream (the page-bounded contract).test_list_uint64_round_trip—List<UInt64>(DDSketch shape) round-trip.test_io_failure_surfaces_as_page_stream_error— body GET failures propagate asPageStream(Io), not masked as decode errors.Stack
Base:
gtt/column-page-stream-trait(PR-5a #6406).PR-6b (#6409) builds the streaming merge engine on top of this decoder.